package cron

import (
	"gitee.com/zackeus/go-boot/cron/drive"
	"gitee.com/zackeus/go-zero/core/threading"
)

func (d *Cron) IsSteady() bool {
	d.mu.RLock()
	defer d.mu.RUnlock()

	return d.ctx != nil && d.ctx.Err() == nil && d.driver != nil && d.driver.Available()
}

// GetNodeID 获取节点ID
func (d *Cron) GetNodeID() string {
	return d.driver.NodeID()
}

// CheckJobAvailable 检查job是否可以在此节点中运行
func (d *Cron) CheckJobAvailable(jobName string) (bool, error) {
	d.mu.RLock()
	defer d.mu.RUnlock()

	if d.hashRing == nil {
		d.logger.Errorf("nodeID=%s, NodePool.nodes is nil", d.GetNodeID())
		return false, ErrNodePoolIsNil
	}
	if d.hashRing.IsEmpty() {
		/* 集群可用节点为空 */
		return false, nil
	}
	if !d.IsSteady() {
		/* 集群未就绪 */
		return false, ErrCronIsNotSteady
	}

	targetNode := d.hashRing.Get(jobName)
	if d.GetNodeID() == targetNode {
		/* 执行节点为当前节点 */
		d.logger.Infof("job %s, running in node: %s, nodeID is %s", jobName, targetNode, d.GetNodeID())
		return true, nil
	}
	return false, nil
}

// 接收事件
func (d *Cron) receiveLoop() {
	threading.GoSafeCtx(d.ctx, func() {
		for {
			select {
			case event, ok := <-d.events:
				if !ok {
					/* channel 已关闭 */
					d.logger.Info("cron node event channel has closed.")
					return
				}
				d.updateHashRing(event)
			case <-d.ctx.Done():
				return
			}
		}
	})
}

// 更新一致性hash
func (d *Cron) updateHashRing(event *drive.Event) {
	d.mu.Lock()
	defer d.mu.Unlock()

	switch event.Type {
	case drive.AddNodeEvent:
		/* 节点新增 */
		d.logger.Infof("add cron node: %v", event.Node)
		d.hashRing.Add(event.Node)
	case drive.DelNodeEvent:
		/* 节点删除 */
		d.logger.Infof("del cron node: %v", event.Node)
		d.hashRing.Del(event.Node)
	default:
		d.logger.Errorf("unknown event type: %v", event.Type)
	}
}
